package com.gitee.jastee.kafka.exception;

import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * CommitFailedException 产生场景模拟
 * @author jast
 * @date 2020/4/28 10:07
 */
public class CommitFailedExceptionDemo {

    static Log log = LogFactory.get();
    static final String KAFKA_BROKERS = "127.0.0.1:9092";
    static final String group = "test-consumer";
    static final String topic = "test2";
    static final boolean isLatest = false;
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
        //        props.put("zookeeper.session.timeout.ms", "4000");
        //        props.put("zookeeper.sync.time.ms", "200");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);//控制每次poll的数量
//		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);//连接超时
//		props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);//请求超时
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//自动提交 false
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        /**
         *
         * 消息处理的总时间超过预设的 max.poll.interval.ms 参数值时，Consumer发生了Rebalance，Kafka Consumer 端会抛出 CommitFailedException 异常
         *
         */

        props.put("max.poll.interval.ms", 10000);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topic));
        consumer.subscribe(Arrays.asList("test-topic"));

        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                log.info("数据:{},offset:{},partition:{}",record.value(),record.offset(),record.partition());
            }
            // 使用 Thread.sleep 模拟真实的消息处理逻辑，六秒后会抛异常
            Thread.sleep(6000L);
            consumer.commitSync();
        }
    }
}
